{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import requests\n",
    "import codecs\n",
    "import re\n",
    "import nltk\n",
    "from pyspark import SparkConf, SparkContext\n",
    "from pyspark.sql import SparkSession, Row\n",
    "from pyspark.sql.functions import udf\n",
    "from pyspark.sql.types import StringType\n",
    "from tqdm import tqdm\n",
    "from pyspark.sql.types import StructField, StructType\n",
    "from pyspark import StorageLevel"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "#Create SparkConf\n",
    "sparkConf =SparkConf().setAppName('VocabularyExploration').setMaster('local[*]')\n",
    "#Create SparkContext\n",
    "sc=SparkContext(conf=sparkConf)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark = SparkSession(sc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_dev_matched = spark.read.option(\"header\", \"true\").csv(\"./data/clean/dev_matched/*\", sep=\"\\t\")\n",
    "df_dev_mismatched = spark.read.option(\"header\", \"true\").csv(\"./data/clean/dev_mismatched/*\", sep=\"\\t\")\n",
    "df_test_matched = spark.read.option(\"header\", \"true\").csv(\"./data/clean/test_matched/*\", sep=\"\\t\")\n",
    "df_test_mismatched = spark.read.option(\"header\", \"true\").csv(\"./data/clean/test_mismatched/*\", sep=\"\\t\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "matched_rdd = df_dev_matched.select([\"sentence1\", \"sentence2\"])\\\n",
    "                        .rdd.map(lambda x: \"{} {}\".format(x.sentence1,x.sentence2)).union(\\\n",
    "df_test_matched.select([\"sentence1\", \"sentence2\"]).\\\n",
    "                        rdd.map(lambda x: \"{} {}\".format(x.sentence1,x.sentence2))\\\n",
    "    ).flatMap(lambda x: x.split(\" \")).\\\n",
    "map(lambda x: (x,1)).reduceByKey(lambda x,y: 1).map(lambda x: x[0])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "mismatched_rdd = df_dev_mismatched.select([\"sentence1\", \"sentence2\"])\\\n",
    "                        .rdd.map(lambda x: \"{} {}\".format(x.sentence1,x.sentence2)).union(\\\n",
    "df_test_mismatched.select([\"sentence1\", \"sentence2\"]).\\\n",
    "                        rdd.map(lambda x: \"{} {}\".format(x.sentence1,x.sentence2))\\\n",
    "    ).flatMap(lambda x: x.split(\" \")).\\\n",
    "map(lambda x: (x,1)).reduceByKey(lambda x,y: 1).map(lambda x: x[0])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## first workload\n",
    "\n",
    "1. the number of common words between matched and mismatched sets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[37630]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "matched_rdd.union(mismatched_rdd).map(lambda x: (1,1)).reduceByKey(lambda x,y: x+y).map(lambda x: x[1]).collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "2. the number of words unique to the matched sets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[10740]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "matched_rdd.subtract(mismatched_rdd).map(lambda x: (1,1)).reduceByKey(lambda x,y: x+y).map(lambda x: x[1]).collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "3. the number of words unique to the mismatched sets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[8160]"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "mismatched_rdd.subtract(matched_rdd).map(lambda x: (1,1)).reduceByKey(lambda x,y: x+y).map(lambda x: x[1]).collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## second workload"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### consider the corpus without filter by stopwords"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_train = spark.read.option(\"header\", \"true\").csv(\"./data/clean/nofilter_train/*\", sep=\"\\t\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "rdd_train = df_train.rdd.map(lambda x: (x.genre,\"{} {}\".format(x.sentence1,x.sentence2)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "def one2many(x):\n",
    "    temp = x[1].split(\" \")\n",
    "    return [(word, x[0]) for word in temp]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "rdd_word2count = rdd_train.reduceByKey(lambda x,y: \"{} {}\".format(x, y)).flatMap(lambda x: one2many(x)).\\\n",
    "    reduceByKey(lambda x,y: \"{} {}\".format(x,y)).map(lambda x: (x[0], set(x[1].split(\" \")))).\\\n",
    "        map(lambda x: (x[0], len(x[1])))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[(1, 0.6396532227666792),\n",
       " (2, 0.13291985059795086),\n",
       " (3, 0.07808198837222584),\n",
       " (4, 0.06094872585637757),\n",
       " (5, 0.0883962124067665)]"
      ]
     },
     "execution_count": 14,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "total = rdd_word2count.count()\n",
    "rdd_word2count.map(lambda x: (x[1], 1)).reduceByKey(lambda x,y: x+y).map(lambda x: (x[0], x[1]/total)).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### consider the corpus filter by stopwords"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_train = spark.read.option(\"header\", \"true\").csv(\"./data/clean/train/*\", sep=\"\\t\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "rdd_train = df_train.rdd.map(lambda x: (x.genre,\"{} {}\".format(x.sentence1,x.sentence2)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "rdd_word2count = rdd_train.reduceByKey(lambda x,y: \"{} {}\".format(x, y)).flatMap(lambda x: one2many(x)).\\\n",
    "    reduceByKey(lambda x,y: \"{} {}\".format(x,y)).map(lambda x: (x[0], set(x[1].split(\" \")))).\\\n",
    "        map(lambda x: (x[0], len(x[1])))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[(1, 0.6408088067011489),\n",
       " (2, 0.13315329335835585),\n",
       " (3, 0.07819151370897606),\n",
       " (4, 0.06100379914862453),\n",
       " (5, 0.08684258708289468)]"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "total = rdd_word2count.count()\n",
    "rdd_word2count.map(lambda x: (x[1], 1)).reduceByKey(lambda x,y: x+y).map(lambda x: (x[0], x[1]/total)).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
